73692f39fffbf9b3cd212ab63497ba8d95019ee8,components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java,MinaComponent,createDatagramEndpoint,#String#MinaConfiguration#,254

Before Change


        boolean sync = configuration.isSync();
        List<IoFilter> filters = configuration.getFilters();

        IoAcceptor acceptor = new DatagramAcceptor(getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaDatagramAcceptor"));
        IoConnector connector = new DatagramConnector(getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "MinaDatagramConnector"));
        SocketAddress address = new InetSocketAddress(configuration.getHost(), configuration.getPort());

After Change


        boolean sync = configuration.isSync();
        List<IoFilter> filters = configuration.getFilters();

        ExecutorService acceptorPool = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "MinaDatagramAcceptor");
        ExecutorService connectorPool = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "MinaDatagramConnector");
        ExecutorService workerPool = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "MinaThreadPool");

        IoAcceptor acceptor = new DatagramAcceptor(acceptorPool);
        IoConnector connector = new DatagramConnector(connectorPool);
        SocketAddress address = new InetSocketAddress(configuration.getHost(), configuration.getPort());

        if (transferExchange) {
            throw new IllegalArgumentException("transferExchange=true is not supported for datagram protocol");
        }

        DatagramConnectorConfig connectorConfig = new DatagramConnectorConfig();
        // must use manual thread model according to Mina documentation
        connectorConfig.setThreadModel(ThreadModel.MANUAL);
        configureDataGramCodecFactory("MinaProducer", connectorConfig, configuration);
        connectorConfig.getFilterChain().addLast("threadPool", new ExecutorFilter(workerPool));
        if (minaLogger) {
            connectorConfig.getFilterChain().addLast("logger", new LoggingFilter());
        }
        appendIoFiltersToChain(filters, connectorConfig.getFilterChain());
        // set connect timeout to mina in seconds
        connectorConfig.setConnectTimeout((int) (timeout / 1000));

        DatagramAcceptorConfig acceptorConfig = new DatagramAcceptorConfig();
        // must use manual thread model according to Mina documentation
        acceptorConfig.setThreadModel(ThreadModel.MANUAL);
        configureDataGramCodecFactory("MinaConsumer", acceptorConfig, configuration);
        acceptorConfig.setDisconnectOnUnbind(true);
        // reuse address is default true for datagram
        acceptorConfig.getFilterChain().addLast("threadPool", new ExecutorFilter(workerPool));
        if (minaLogger) {
            acceptorConfig.getFilterChain().addLast("logger", new LoggingFilter());
        }
        appendIoFiltersToChain(filters, acceptorConfig.getFilterChain());

        MinaEndpoint endpoint = new MinaEndpoint(uri, this);
        endpoint.setAddress(address);
        endpoint.setAcceptor(acceptor);
        endpoint.setAcceptorConfig(acceptorConfig);
        endpoint.setConnector(connector);
        endpoint.setConnectorConfig(connectorConfig);
        endpoint.setConfiguration(configuration);

        // enlist threads pools in use on endpoint
        endpoint.addThreadPool(acceptorPool);
        endpoint.addThreadPool(connectorPool);
        endpoint.addThreadPool(workerPool);